Conversation
When a job runs longer than its concurrency_duration, the semaphore expires and gets deleted by the dispatcher's concurrency maintenance. Previously, BlockedExecution.releasable would consider a concurrency key releasable when its semaphore was missing, causing blocked jobs to be released and run concurrently with the still-executing job. This violated the concurrency guarantee: with limits_concurrency set to 1, two jobs with the same concurrency key could run simultaneously if the first job exceeded its concurrency_duration. Fix: Check for claimed executions before marking a concurrency key as releasable. A key is only releasable when: - Its semaphore is missing OR has available slots, AND - No jobs with that key are currently being executed (claimed) This ensures concurrency limits are respected even when jobs exceed their configured duration.
…hutdown When a worker gracefully shuts down, claimed executions are released back to ready state via ClaimedExecution#release. Previously, this always used dispatch_bypassing_concurrency_limits, which ignored whether another job with the same concurrency key was already running. This could cause duplicate concurrent executions in the following scenario: 1. Job A starts running with concurrency key "X" (semaphore value = 0) 2. Time passes, semaphore expires and is deleted 3. Job B with same key "X" enqueues, creates new semaphore, starts running 4. Worker running Job A receives shutdown signal 5. Job A is released via dispatch_bypassing_concurrency_limits 6. Job A goes to ready state, gets picked up by another worker 7. Both Job A and Job B now running concurrently (violates limit!) Fix: Before releasing a job, check if any other jobs with the same concurrency key are currently executing. If so, go through normal dispatch which respects the concurrency policy (block or discard). If not, continue to bypass limits for performance. This ensures that graceful shutdown doesn't violate concurrency guarantees, even when semaphores have expired during long-running job execution.
|
Is the issue that it violates the expectation? Because this seems like expected behavior - if the duration elapsed, then you don't lock the job anymore. That job just exists outside of the concurrency limits now? |
Exactly, even with a 12 hour limit on the concurrency I saw endless duplicate jobs for two different reasons, both fixed here and one being exactly what you just insinuated. Retry, restarts, OOM errors, etc cause a number of duplicates which this PR resolves. Ultimately, that worker will be rewritten anyway but it seemed like the locking mechanism has some flaws/bleeds when I looked at it. |
I am currently trying things to fix all the duplicate, very long-running jobs that are crashing our system.
Prevent blocked jobs from being released while job is still executing
When a job runs longer than its concurrency_duration, the semaphore expires and gets deleted by the dispatcher's concurrency maintenance. Previously, BlockedExecution.releasable would consider a concurrency key releasable when its semaphore was missing, causing blocked jobs to be released and run concurrently with the still-executing job.
This violated the concurrency guarantee: with limits_concurrency set to 1, two jobs with the same concurrency key could run simultaneously if the first job exceeded its concurrency_duration.
Fix: Check for claimed executions before marking a concurrency key as releasable. A key is only releasable when:
This ensures concurrency limits are respected even when jobs exceed their configured duration.
Respect concurrency limits when releasing claimed executions during shutdown
When a worker gracefully shuts down, claimed executions are released back
to ready state via ClaimedExecution#release. Previously, this always used
dispatch_bypassing_concurrency_limits, which ignored whether another job
with the same concurrency key was already running.
This could cause duplicate concurrent executions in the following scenario:
Fix: Before releasing a job, check if any other jobs with the same
concurrency key are currently executing. If so, go through normal dispatch
which respects the concurrency policy (block or discard). If not, continue
to bypass limits for performance.
This ensures that graceful shutdown doesn't violate concurrency guarantees,
even when semaphores have expired during long-running job execution.